package com.tyc.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;
/**
 * Union的作用是将两个或多个流合并，创建出一个新流，要求作为数据源的流类型一致，最终数据的新流类型和数据源一致。
 * Union DataStream* → DataStream
 * @author tyc
 * @version 1.0
 * @date 2022-11-16 17:58:52
 */
public class Transform_5_Union {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource1 = env.fromCollection(
                Arrays.asList("hello flink",
                        "hello java",
                        "hi program",
                        "hello",
                        "java"));

        //把word复制一遍，输出结果是之前的一倍
        DataStreamSource<String> dataStreamSource2 = env.fromCollection(
                Arrays.asList("hello flink",
                        "hello java",
                        "hi program",
                        "hello",
                        "java"));

        DataStream<String> unionStreamSource = dataStreamSource1.union(dataStreamSource2);
        // 1、分词
        // 2、按每个单词作为key分组
        // 3、使用sum统计每个单词出现次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = unionStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] s1 = s.split(" ");
                for (String s2 : s1) {
                    collector.collect(new Tuple2<>(s2, 1));
                }
            }
        }).keyBy(item -> item.f0)
                .sum(1);

        reduce.print();

        env.execute();
    }
}
